package com.zc.batch;

import com.zc.model.Person;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisBatchItemWriter;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecutionListener;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.support.CompositeItemWriter;
import org.springframework.batch.support.DatabaseType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;
import java.util.ArrayList;
import java.util.List;

@Configuration
@EnableBatchProcessing // 开启批处理
public class BatchConfig {


    @Autowired
    private SqlSessionFactory sqlSessionFactory ;

    /**
     * 作业仓库
     *
     * @param dataSource
     * @param transactionManager
     * @return
     * @throws Exception
     */
    @Bean
    public JobRepository jobRepository(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {

        JobRepositoryFactoryBean jobRepositoryFactoryBean = new JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        jobRepositoryFactoryBean.setDatabaseType(DatabaseType.MYSQL.name());

        return jobRepositoryFactoryBean.getObject();
    }

    /**
     * 作业调度器
     *
     * @param dataSource
     * @param transactionManager
     * @return
     * @throws Exception
     */
    public SimpleJobLauncher jobLauncher(DataSource dataSource, PlatformTransactionManager transactionManager) throws Exception {

        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(this.jobRepository(dataSource, transactionManager));

        return jobLauncher;
    }

    @Bean
    public ItemReader<Person> reader() {
        FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
        //加载外部文件数据 文件类型:CSV
        reader.setResource(new ClassPathResource("euzd_20170724001.csv"));
        reader.setLineMapper(new DefaultLineMapper<Person>() {{
            setLineTokenizer(new DelimitedLineTokenizer() {{
                setNames(new String[] { "personName","personAge","personSex" });
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>() {{
                setTargetType(Person.class);
            }});
        }});
        return reader;
    }
    //2.处理数据
    @Bean
    public PersonItemProcessor processor() {
        return new PersonItemProcessor();
    }
    //3.写数据
    @Bean
    public ItemWriter<Person> writer() {

        MyBatisBatchItemWriter<Person> writer = new MyBatisBatchItemWriter<Person>();
        writer.setSqlSessionFactory(sqlSessionFactory);
        String statementId = "com.zc.mapper.PersonMapper.insert";
        writer.setStatementId(statementId);
        CompositeItemWriter compositeItemWriter  = new CompositeItemWriter();
        List delegates = new ArrayList();
        delegates.add(writer);
        compositeItemWriter.setDelegates(delegates);
        writer.setAssertUpdates(false);
        return writer;
    }

    @Bean
    public Job importJob(JobBuilderFactory jobs, Step personStep, JobExecutionListener listener) {
        return jobs.get("importJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener)
                .flow(personStep)
                .end()
                .build();
    }

    @Bean
    public Step personStep(StepBuilderFactory stepBuilderFactory,
                           ItemReader<Person> reader, ItemWriter<Person> writer, ItemProcessor<Person, Person> processor) {
        return stepBuilderFactory.get("personStep")
                .<Person, Person>chunk(5000)
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

}